package org.codehaus.activemq.transport.udp;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.AbstractTransportChannel;
import org.codehaus.activemq.util.IdGenerator;

public class UdpTransportChannel extends AbstractTransportChannel
  implements Runnable
{
  private static final int SOCKET_BUFFER_SIZE = 32768;
  private static final int SO_TIMEOUT = 5000;
  private static final Log log = LogFactory.getLog(UdpTransportChannel.class);
  protected DatagramSocket socket;
  protected int port;
  protected InetAddress inetAddress;
  private WireFormat wireFormat;
  private SynchronizedBoolean closed;
  private SynchronizedBoolean started;
  private Thread thread;
  private IdGenerator idGenerator = new IdGenerator();
  private Object lock;

  protected UdpTransportChannel(WireFormat wireFormat)
  {
    this.wireFormat = wireFormat;
    this.closed = new SynchronizedBoolean(false);
    this.started = new SynchronizedBoolean(false);
    this.lock = new Object();
  }

  public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
    this(wireFormat, remoteLocation, remoteLocation.getPort());
  }

  public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException {
    this(wireFormat);
    try {
      this.port = port;
      this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
      this.socket = createSocket(remoteLocation.getPort());

      this.socket.setReceiveBufferSize(32768);
      this.socket.setSendBufferSize(32768);

      connect();
    }
    catch (Exception ioe)
    {
      JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe);
      jmsEx.setLinkedException(ioe);
      throw jmsEx;
    }
  }

  public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket)
    throws JMSException
  {
    this(wireFormat);
    this.socket = socket;
    this.port = socket.getPort();
    this.inetAddress = socket.getInetAddress();
    try {
      socket.setReceiveBufferSize(32768);
      socket.setSendBufferSize(32768);
    }
    catch (IOException ioe) {
      JMSException jmsEx = new JMSException("Initialization of TransportChannel failed");
      jmsEx.setLinkedException(ioe);
      throw jmsEx;
    }
  }

  public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException {
    this(wireFormat, socket);
    this.port = port;
  }

  public void stop()
  {
    if (this.closed.commit(false, true)) {
      super.stop();
      try {
        this.socket.close();
      }
      catch (Exception e) {
        log.trace(toString() + " now closed");
      }
    }
  }

  public void start()
    throws JMSException
  {
    if (this.started.commit(false, true)) {
      this.thread = new Thread(this, "Thread:" + toString());
      this.thread.setDaemon(true);
      this.thread.start();
    }
  }

  public void asyncSend(Packet packet)
    throws JMSException
  {
    try
    {
      if (log.isDebugEnabled()) {
        log.debug("Sending packet: " + packet);
      }
      DatagramPacket dpacket = createDatagramPacket(packet);

      this.socket.send(dpacket);
    }
    catch (IOException e)
    {
      JMSException jmsEx = new JMSException("asyncSend failed " + e);
      jmsEx.setLinkedException(e);
      throw jmsEx;
    }
  }

  public boolean isMulticast() {
    return false;
  }

  public void run()
  {
    while (!this.closed.get())
      try {
        this.socket.setSoTimeout(5000);

        DatagramPacket dpacket = createDatagramPacket();
        while (!this.socket.isClosed()) {
          this.socket.setSoTimeout(0);
          this.socket.receive(dpacket);
          Packet packet = this.wireFormat.readPacket(getClientID(), dpacket);
          if (packet != null) {
            doConsumePacket(packet);
          }
        }

        log.trace("The socket peer is now closed");
        doClose(new IOException("Socket peer is now closed"));
      }
      catch (SocketTimeoutException ste)
      {
      }
      catch (IOException e) {
        doClose(e);
      }
  }

  protected DatagramPacket createDatagramPacket()
  {
    DatagramPacket answer = new DatagramPacket(new byte[32768], 32768);
    if (this.port >= 0) {
      answer.setPort(this.port);
    }
    answer.setAddress(this.inetAddress);
    return answer;
  }

  protected DatagramPacket createDatagramPacket(Packet packet)
    throws IOException, JMSException
  {
    DatagramPacket answer = this.wireFormat.writePacket(getClientID(), packet);
    if (this.port >= 0) {
      answer.setPort(this.port);
    }
    answer.setAddress(this.inetAddress);
    return answer;
  }

  private void doClose(Exception ex) {
    if (!this.closed.get()) {
      JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
      jmsEx.setLinkedException(ex);
      onAsyncException(jmsEx);
      stop();
    }
  }

  protected void connect() throws IOException
  {
  }

  protected DatagramSocket createSocket(int port) throws IOException {
    return new DatagramSocket(port, this.inetAddress);
  }

  public String toString()
  {
    return "UdpTransportChannel: " + this.socket;
  }
}